home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- __revision__ = '$Id: streambase.py 652 2006-08-27 19:41:15Z jajcus $'
- __docformat__ = 'restructuredtext en'
- import libxml2
- import socket
- import os
- import time
- import random
- import threading
- import errno
- import logging
- from pyxmpp import xmlextra
- from pyxmpp.expdict import ExpiringDictionary
- from pyxmpp.utils import to_utf8
- from pyxmpp.stanza import Stanza
- from pyxmpp.error import StreamErrorNode
- from pyxmpp.iq import Iq
- from pyxmpp.presence import Presence
- from pyxmpp.message import Message
- from pyxmpp.jid import JID
- from pyxmpp import resolver
- from pyxmpp.stanzaprocessor import StanzaProcessor
- from pyxmpp.exceptions import StreamError, StreamEncryptionRequired, HostMismatch, ProtocolError
- from pyxmpp.exceptions import FatalStreamError, StreamParseError, StreamAuthenticationError
- STREAM_NS = 'http://etherx.jabber.org/streams'
- BIND_NS = 'urn:ietf:params:xml:ns:xmpp-bind'
-
- def stanza_factory(xmlnode, stream = None):
- if xmlnode.name == 'iq':
- return Iq(xmlnode, stream = stream)
-
- if xmlnode.name == 'message':
- return Message(xmlnode, stream = stream)
-
- if xmlnode.name == 'presence':
- return Presence(xmlnode, stream = stream)
- else:
- return Stanza(xmlnode, stream = stream)
-
-
- class StreamBase(StanzaProcessor, xmlextra.StreamHandler):
-
- def __init__(self, default_ns, extra_ns = (), keepalive = 0, owner = None):
- StanzaProcessor.__init__(self)
- xmlextra.StreamHandler.__init__(self)
- self.default_ns_uri = default_ns
- if extra_ns:
- self.extra_ns_uris = extra_ns
- else:
- self.extra_ns_uris = []
- self.keepalive = keepalive
- self._reader_lock = threading.Lock()
- self.process_all_stanzas = False
- self.port = None
- self._reset()
- self.owner = owner
- self._StreamBase__logger = logging.getLogger('pyxmpp.Stream')
-
-
- def _reset(self):
- self.doc_in = None
- self.doc_out = None
- self.socket = None
- self._reader = None
- self.addr = None
- self.default_ns = None
- self.extra_ns = { }
- self.stream_ns = None
- self._reader = None
- self.ioreader = None
- self.me = None
- self.peer = None
- self.skip = False
- self.stream_id = None
- self._iq_response_handlers = ExpiringDictionary()
- self._iq_get_handlers = { }
- self._iq_set_handlers = { }
- self._message_handlers = []
- self._presence_handlers = []
- self.eof = False
- self.initiator = None
- self.features = None
- self.authenticated = False
- self.peer_authenticated = False
- self.auth_method_used = None
- self.version = None
- self.last_keepalive = False
-
-
- def _connect_socket(self, sock, to = None):
- self.eof = 0
- self.socket = sock
- if to:
- self.peer = JID(to)
- else:
- self.peer = None
- self.initiator = 1
- self._send_stream_start()
- self._make_reader()
-
-
- def connect(self, addr, port, service = None, to = None):
- self.lock.acquire()
-
- try:
- return self._connect(addr, port, service, to)
- finally:
- self.lock.release()
-
-
-
- def _connect(self, addr, port, service = None, to = None):
- if to is None:
- to = str(addr)
-
- if service is not None:
- self.state_change('resolving srv', (addr, service))
- addrs = resolver.resolve_srv(addr, service)
- if not addrs:
- addrs = [
- (addr, port)]
-
- else:
- addrs = [
- (addr, port)]
- msg = None
- for addr, port in addrs:
- if type(addr) in (str, unicode):
- self.state_change('resolving', addr)
-
- s = None
- for res in resolver.getaddrinfo(addr, port, 0, socket.SOCK_STREAM):
- (family, socktype, proto, _unused, sockaddr) = res
-
- try:
- s = socket.socket(family, socktype, proto)
- self.state_change('connecting', sockaddr)
- s.connect(sockaddr)
- self.state_change('connected', sockaddr)
- except socket.error:
- msg = None
- self._StreamBase__logger.debug('Connect to %r failed' % (sockaddr,))
- if s:
- s.close()
- s = None
- continue
- continue
-
-
- if s:
- break
- continue
-
- if not s:
- if msg:
- raise socket.error, msg
- else:
- raise FatalStreamError, 'Cannot connect'
-
- self.addr = addr
- self.port = port
- self._connect_socket(s, to)
- self.last_keepalive = time.time()
-
-
- def accept(self, sock, myname):
- self.lock.acquire()
-
- try:
- return self._accept(sock, myname)
- finally:
- self.lock.release()
-
-
-
- def _accept(self, sock, myname):
- self.eof = 0
- (self.socket, addr) = sock.accept()
- self._StreamBase__logger.debug('Connection from: %r' % (addr,))
- (self.addr, self.port) = addr
- if myname:
- self.me = JID(myname)
- else:
- self.me = None
- self.initiator = 0
- self._make_reader()
- self.last_keepalive = time.time()
-
-
- def disconnect(self):
- self.lock.acquire()
-
- try:
- return self._disconnect()
- finally:
- self.lock.release()
-
-
-
- def _disconnect(self):
- if self.doc_out:
- self._send_stream_end()
-
-
-
- def _post_connect(self):
- pass
-
-
- def _post_auth(self):
- pass
-
-
- def state_change(self, state, arg):
- self._StreamBase__logger.debug('State: %s: %r' % (state, arg))
-
-
- def close(self):
- self.lock.acquire()
-
- try:
- return self._close()
- finally:
- self.lock.release()
-
-
-
- def _close(self):
- self._disconnect()
- if self.doc_in:
- self.doc_in = None
-
- if self.features:
- self.features = None
-
- self._reader = None
- self.stream_id = None
- if self.socket:
- self.socket.close()
-
- self._reset()
-
-
- def _make_reader(self):
- self._reader = xmlextra.StreamReader(self)
-
-
- def stream_start(self, doc):
- self.doc_in = doc
- self._StreamBase__logger.debug('input document: %r' % (self.doc_in.serialize(),))
-
- try:
- r = self.doc_in.getRootElement()
- if r.ns().getContent() != STREAM_NS:
- self._send_stream_error('invalid-namespace')
- raise FatalStreamError, 'Invalid namespace.'
- except libxml2.treeError:
- self._send_stream_error('invalid-namespace')
- raise FatalStreamError, "Couldn't get the namespace."
-
- self.version = r.prop('version')
- if self.version and self.version != '1.0':
- self._send_stream_error('unsupported-version')
- raise FatalStreamError, 'Unsupported protocol version.'
-
- to_from_mismatch = 0
- if self.initiator:
- self.stream_id = r.prop('id')
- peer = r.prop('from')
- if peer:
- peer = JID(peer)
-
- if self.peer:
- if peer and peer != self.peer:
- self._StreamBase__logger.debug('peer hostname mismatch: %r != %r' % (peer, self.peer))
- to_from_mismatch = 1
-
- else:
- self.peer = peer
- else:
- to = r.prop('to')
- if to:
- to = self.check_to(to)
- if not to:
- self._send_stream_error('host-unknown')
- raise FatalStreamError, 'Bad "to"'
-
- self.me = JID(to)
-
- self._send_stream_start(self.generate_id())
- self._send_stream_features()
- self.state_change('fully connected', self.peer)
- self._post_connect()
- if not self.version:
- self.state_change('fully connected', self.peer)
- self._post_connect()
-
- if to_from_mismatch:
- raise HostMismatch
-
-
-
- def stream_end(self, _unused):
- self._StreamBase__logger.debug('Stream ended')
- self.eof = 1
- if self.doc_out:
- self._send_stream_end()
-
- if self.doc_in:
- self.doc_in = None
- self._reader = None
- if self.features:
- self.features = None
-
-
- self.state_change('disconnected', self.peer)
-
-
- def stanza_start(self, doc, node):
- pass
-
-
- def stanza(self, _unused, node):
- self._process_node(node)
-
-
- def error(self, descr):
- raise StreamParseError, descr
-
-
- def _send_stream_end(self):
- self.doc_out.getRootElement().addContent(' ')
- s = self.doc_out.getRootElement().serialize(encoding = 'UTF-8')
- end = s.rindex('<')
-
- try:
- self._write_raw(s[end:])
- except (IOError, SystemError, socket.error):
- e = None
- self._StreamBase__logger.debug('Sending stream closing tag failed:' + str(e))
-
- self.doc_out.freeDoc()
- self.doc_out = None
- if self.features:
- self.features = None
-
-
-
- def _send_stream_start(self, sid = None):
- if self.doc_out:
- raise StreamError, 'Stream start already sent'
-
- self.doc_out = libxml2.newDoc('1.0')
- root = self.doc_out.newChild(None, 'stream', None)
- self.stream_ns = root.newNs(STREAM_NS, 'stream')
- root.setNs(self.stream_ns)
- self.default_ns = root.newNs(self.default_ns_uri, None)
- for prefix, uri in self.extra_ns:
- self.extra_ns[uri] = root.newNs(uri, prefix)
-
- if self.peer and self.initiator:
- root.setProp('to', self.peer.as_utf8())
-
- if self.me and not (self.initiator):
- root.setProp('from', self.me.as_utf8())
-
- root.setProp('version', '1.0')
- if sid:
- root.setProp('id', sid)
- self.stream_id = sid
-
- sr = self.doc_out.serialize(encoding = 'UTF-8')
- self._write_raw(sr[:sr.find('/>')] + '>')
-
-
- def _send_stream_error(self, condition):
- if not self.doc_out:
- self._send_stream_start()
-
- e = StreamErrorNode(condition)
- e.xmlnode.setNs(self.stream_ns)
- self._write_raw(e.serialize())
- e.free()
- self._send_stream_end()
-
-
- def _restart_stream(self):
- self._reader = None
- self.doc_out = None
- self.doc_in = None
- self.features = None
- if self.initiator:
- self._send_stream_start(self.stream_id)
-
- self._make_reader()
-
-
- def _make_stream_features(self):
- root = self.doc_out.getRootElement()
- features = root.newChild(root.ns(), 'features', None)
- return features
-
-
- def _send_stream_features(self):
- self.features = self._make_stream_features()
- self._write_raw(self.features.serialize(encoding = 'UTF-8'))
-
-
- def write_raw(self, data):
- self.lock.acquire()
-
- try:
- return self._write_raw(data)
- finally:
- self.lock.release()
-
-
-
- def _write_raw(self, data):
- logging.getLogger('pyxmpp.Stream.out').debug('OUT: %r', data)
-
- try:
- self.socket.send(data)
- except (IOError, OSError, socket.error):
- e = None
- raise FatalStreamError('IO Error: ' + str(e))
-
-
-
- def _write_node(self, xmlnode):
- if self.eof and not (self.socket) or not (self.doc_out):
- self._StreamBase__logger.debug('Dropping stanza: %r' % (xmlnode,))
- return None
-
- xmlnode = xmlnode.docCopyNode(self.doc_out, 1)
- self.doc_out.addChild(xmlnode)
-
- try:
- ns = xmlnode.ns()
- except libxml2.treeError:
- ns = None
-
- if ns and ns.content == xmlextra.COMMON_NS:
- xmlextra.replace_ns(xmlnode, ns, self.default_ns)
-
- s = xmlextra.safe_serialize(xmlnode)
- self._write_raw(s)
- xmlnode.unlinkNode()
- xmlnode.freeNode()
-
-
- def send(self, stanza):
- self.lock.acquire()
-
- try:
- return self._send(stanza)
- finally:
- self.lock.release()
-
-
-
- def _send(self, stanza):
- if not self.version:
-
- try:
- err = stanza.get_error()
- except ProtocolError:
- err = None
-
- if err:
- err.downgrade()
-
-
- self.fix_out_stanza(stanza)
- self._write_node(stanza.xmlnode)
-
-
- def idle(self):
- self.lock.acquire()
-
- try:
- return self._idle()
- finally:
- self.lock.release()
-
-
-
- def _idle(self):
- self._iq_response_handlers.expire()
- if not (self.socket) or self.eof:
- return None
-
- now = time.time()
- if self.keepalive and now - self.last_keepalive >= self.keepalive:
- self._write_raw(' ')
- self.last_keepalive = now
-
-
-
- def fileno(self):
- self.lock.acquire()
-
- try:
- return self.socket.fileno()
- finally:
- self.lock.release()
-
-
-
- def loop(self, timeout):
- self.lock.acquire()
-
- try:
- while not (self.eof) and self.socket is not None:
- act = self._loop_iter(timeout)
- if not act:
- self._idle()
- continue
- finally:
- self.lock.release()
-
-
-
- def loop_iter(self, timeout):
- self.lock.acquire()
-
- try:
- return self._loop_iter(timeout)
- finally:
- self.lock.release()
-
-
-
- def _loop_iter(self, timeout):
- import select
- self.lock.release()
-
- try:
- (ifd, _unused, efd) = select.select([
- self.socket], [], [
- self.socket], timeout)
- except select.error:
- e = None
- if e.args[0] != errno.EINTR:
- raise
-
- ifd = []
- _unused = []
- efd = []
- finally:
- self.lock.acquire()
-
- if self.socket in ifd or self.socket in efd:
- self._process()
- return True
- else:
- return False
-
-
- def process(self):
- self.lock.acquire()
-
- try:
- self._process()
- finally:
- self.lock.release()
-
-
-
- def _process(self):
-
- try:
-
- try:
- self._read()
- except (xmlextra.error,):
- e = None
- self._StreamBase__logger.exception('Exception during read()')
- raise StreamParseError(unicode(e))
- except:
- raise
-
- except (IOError, OSError, socket.error):
- e = None
- self.close()
- raise FatalStreamError('IO Error: ' + str(e))
- except (FatalStreamError, KeyboardInterrupt, SystemExit):
- e = None
- self.close()
- raise
-
-
-
- def _read(self):
- self._StreamBase__logger.debug('StreamBase._read(), socket: %r', self.socket)
- if self.eof:
- return None
-
-
- try:
- r = self.socket.recv(1024)
- except socket.error:
- e = None
- if e.args[0] != errno.EINTR:
- raise
-
- return None
-
- self._feed_reader(r)
-
-
- def _feed_reader(self, data):
- logging.getLogger('pyxmpp.Stream.in').debug('IN: %r', data)
- if data:
-
- try:
- r = self._reader.feed(data)
- while r:
- r = self._reader.feed('')
- if r is None:
- self.eof = 1
- self.disconnect()
- except StreamParseError:
- self._send_stream_error('xml-not-well-formed')
- raise
- except:
- None<EXCEPTION MATCH>StreamParseError
-
-
- None<EXCEPTION MATCH>StreamParseError
- self.eof = 1
- self.disconnect()
- if self.eof:
- self.stream_end(None)
-
-
-
- def _process_node(self, xmlnode):
- ns_uri = xmlnode.ns().getContent()
- if ns_uri == 'http://etherx.jabber.org/streams':
- self._process_stream_node(xmlnode)
- return None
-
- if ns_uri == self.default_ns_uri:
- stanza = stanza_factory(xmlnode, self)
- self.lock.release()
-
- try:
- self.process_stanza(stanza)
- finally:
- self.lock.acquire()
- stanza.free()
-
- else:
- self._StreamBase__logger.debug('Unhandled node: %r' % (xmlnode.serialize(),))
-
-
- def _process_stream_node(self, xmlnode):
- if xmlnode.name == 'error':
- e = StreamErrorNode(xmlnode)
- self.lock.release()
-
- try:
- self.process_stream_error(e)
- finally:
- self.lock.acquire()
- e.free()
-
- return None
- elif xmlnode.name == 'features':
- self._StreamBase__logger.debug('Got stream features')
- self._StreamBase__logger.debug('Node: %r' % (xmlnode,))
- self.features = xmlnode.copyNode(1)
- self.doc_in.addChild(self.features)
- self._got_features()
- return None
-
- self._StreamBase__logger.debug('Unhandled stream node: %r' % (xmlnode.serialize(),))
-
-
- def process_stream_error(self, err):
- self._StreamBase__logger.debug('Unhandled stream error: condition: %s %r' % (err.get_condition().name, err.serialize()))
-
-
- def check_to(self, to):
- if to != self.me:
- return None
-
- return to
-
-
- def generate_id(self):
- return '%i-%i-%s' % (os.getpid(), time.time(), str(random.random())[2:])
-
-
- def _got_features(self):
- ctxt = self.doc_in.xpathNewContext()
- ctxt.setContextNode(self.features)
- ctxt.xpathRegisterNs('stream', STREAM_NS)
- ctxt.xpathRegisterNs('bind', BIND_NS)
- bind_n = None
-
- try:
- bind_n = ctxt.xpathEval('bind:bind')
- finally:
- ctxt.xpathFreeContext()
-
- if self.authenticated:
- if bind_n:
- self.bind(self.me.resource)
- else:
- self.state_change('authorized', self.me)
-
-
-
- def bind(self, resource):
- iq = Iq(stanza_type = 'set')
- q = iq.new_query(BIND_NS, u'bind')
- if resource:
- q.newTextChild(None, 'resource', to_utf8(resource))
-
- self.state_change('binding', resource)
- self.set_response_handlers(iq, self._bind_success, self._bind_error)
- self.send(iq)
- iq.free()
-
-
- def _bind_success(self, stanza):
- jid_n = stanza.xpath_eval('bind:bind/bind:jid', {
- 'bind': BIND_NS })
- if jid_n:
- self.me = JID(jid_n[0].getContent().decode('utf-8'))
-
- self.state_change('authorized', self.me)
-
-
- def _bind_error(self, stanza):
- raise FatalStreamError, 'Resource binding failed'
-
-
- def connected(self):
- if self.doc_in and self.doc_out and not (self.eof):
- return True
- else:
- return False
-
-
-